package org.zjvis.datascience.web.controller;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.mayabot.nlp.common.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.zjvis.datascience.common.annotation.PipelineAuth;
import org.zjvis.datascience.common.annotation.ProjectAuth;
import org.zjvis.datascience.common.dto.PipelineDTO;
import org.zjvis.datascience.common.dto.user.UserDTO;
import org.zjvis.datascience.common.enums.PivotTableMethodEnum;
import org.zjvis.datascience.common.enums.ProjectAuthEnum;
import org.zjvis.datascience.common.enums.TaskTypeEnum;
import org.zjvis.datascience.common.exception.DataScienceException;
import org.zjvis.datascience.common.model.ApiResult;
import org.zjvis.datascience.common.model.ApiResultCode;
import org.zjvis.datascience.common.util.JwtUtil;
import org.zjvis.datascience.common.vo.PipelineVO;
import org.zjvis.datascience.service.PipelineService;
import org.zjvis.datascience.service.TaskService;
import org.zjvis.datascience.service.UserService;
import org.zjvis.datascience.service.dag.DAGScheduler;

/**
 * @description 数据流图接口 Controller
 * @date 2021-11-22
 */
@RequestMapping("/pipeline")
@RestController
@Api(tags = "pipeline", description = "数据流图接口")
@Validated
public class PipelineController {

    private final static Logger logger = LoggerFactory.getLogger(PipelineController.class);

    @Autowired
    private PipelineService pipelineService;

    @Autowired
    private DAGScheduler dagScheduler;

    @Autowired
    private UserService userService;

    @Autowired
    private Environment environment;

    @PostMapping(value = "/save")
    @ResponseBody
    @ApiOperation(value = "新增数据流图", notes = "新增数据流图")
    public ApiResult<Long> save(HttpServletRequest request,
                                @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) PipelineVO vo) {
        if (vo.getData() == null || vo.getData().isEmpty()) {
            logger.warn("API /pipeline/save failed, since {}", "data is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (vo.getProjectId() == null || vo.getProjectId() < 0L) {
            logger.warn("API /pipeline/save failed, since {}", "projectId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (vo.getName() == null || vo.getName().isEmpty()) {
            logger.warn("API /pipeline/save failed, since {}", "name is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (vo.getUserId() == null || vo.getUserId() <= 0L) {
            Long userId = JwtUtil.getCurrentUserId();
            System.out.println("userId=" + userId);
            vo.setUserId(userId);
        }
        Long id = pipelineService.save(vo.toPipeline());
        return ApiResult.valueOf(id);
    }

    @PostMapping(value = "/update")
    @ResponseBody
    @ApiOperation(value = "更新数据流图", notes = "更新数据流图")
    public ApiResult<Long> update(HttpServletRequest request,
                                  @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) PipelineVO vo) {

        if (vo.getId() == null || vo.getId() <= 0L) {
            logger.warn("API /pipeline/update failed, since {}", "id is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        if (vo.getName() == null || vo.getName().isEmpty()) {
            logger.warn("API /pipeline/update failed, since {}", "name is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        pipelineService.update(vo.toPipeline());
        return ApiResult.valueOf(vo.getId());
    }

    @PostMapping(value = "/queryById")
    @ResponseBody
    @ApiOperation(value = "查询数据流图", notes = "根据id查询单个数据流图")
    public ApiResult<PipelineVO> queryById(HttpServletRequest request,
                                           @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ) PipelineVO vo) {
        if (vo.getId() == null || vo.getId() <= 0L) {
            logger.warn("API /pipeline/queryById failed, since {}", "id is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        PipelineDTO pipeline = pipelineService.queryById(vo.getId());
        PipelineVO ret = pipeline.view(userService.queryById(pipeline.getUserId()).getName());

        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/copy")
    @ResponseBody
    @ApiOperation(value = "拷贝数据流图", notes = "拷贝数据流图")
    public ApiResult<Long> copy(HttpServletRequest request,
                                @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) PipelineVO vo) {
        if (vo.getId() == null || vo.getId() <= 0L) {
            logger.warn("API /pipeline/copy failed, since {}", "id is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        Long id = pipelineService.copy(vo.getId());
        return ApiResult.valueOf(id);
    }

    @PostMapping(value = "/queryByProject")
    @ResponseBody
    @ApiOperation(value = "批量查询数据流图", notes = "根据projectId批量查询数据流图")
    public ApiResult<List<PipelineVO>> queryByProject(HttpServletRequest request,
                                                      @RequestBody @ProjectAuth(auth = ProjectAuthEnum.READ) PipelineVO vo) {
        if (vo.getProjectId() == null || vo.getProjectId() <= 0L) {
            logger.warn("API /pipeline/queryByProject failed, since {}", "projectId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        List<PipelineDTO> pipelines = pipelineService.queryByProject(vo.getProjectId());

        //把当前用户所拥有的pipeline放在列表首位
        long userId = JwtUtil.getCurrentUserId();
        List<PipelineVO> ret = new ArrayList<>();

        if (pipelines != null && !pipelines.isEmpty()) {
            Set<Long> ids = new HashSet<>();
            for (PipelineDTO pipeline : pipelines) {
                ids.add(pipeline.getUserId());
            }
            List<UserDTO> users = userService.listByIds(ids);
            for (PipelineDTO pipeline : pipelines) {
                JSONObject dataJson = JSONObject.parseObject(pipeline.getDataJson());
                if (!dataJson.containsKey("published")) {
                    //可视化构建系统会复制 pipeline, "published" 是标志位
                    if (pipeline.getUserId() == userId) {
                        for (UserDTO user : users) {
                            if (pipeline.getUserId().equals(user.getId())) {
                                ret.add(0, pipeline.view(user.getName()));
                                break;
                            }
                        }
                    } else {
                        for (UserDTO user : users) {
                            if (pipeline.getUserId().equals(user.getId())) {
                                ret.add(pipeline.view(user.getName()));
                                break;
                            }
                        }
                    }
                }
            }
        }
        return ApiResult.valueOf(ret);
    }

    @PostMapping(value = "/deleteById")
    @ResponseBody
    @Transactional(rollbackFor = Exception.class)
    @ApiOperation(value = "删除数据流图", notes = "根据id删除单个数据流图")
    public ApiResult<Void> deleteById(HttpServletRequest request,
                                      @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) PipelineVO vo) {
        if (vo.getId() == null || vo.getId() <= 0L) {
            logger.warn("API /pipeline/deleteById failed, since {}", "id is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        pipelineService.deleteTasksByPipeline(vo.getId());
        //删除pipeline表中的记录
        pipelineService.delete(vo.getId());
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/execute")
    @ResponseBody
    @ApiOperation(value = "执行数据流图", notes = "执行单个数据流图")
    public ApiResult<List<Long>> execute(HttpServletRequest request,
                                         @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) JSONObject params) {
        if (params.getLong("id") == null || params.getLong("id") <= 0L) {
            logger.warn("API /pipeline/execute failed, since {}", "pipelineId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        Boolean isSampling = environment.getProperty("app.sampling", Boolean.class);
        Long pipelineId = params.getLong("id");
        Long taskId = pipelineService.traceMostHeadTask(params.getString("taskId"));
        //允许外部请求选择 采样还是全量执行 e.g. 可视化构建触发
        if (params.containsKey("sampling")){
            isSampling = params.getBoolean("sampling");
        }
        List<Long> sessionIds = Lists.newArrayList();
        if (isSampling) {
            sessionIds.add(dagScheduler.triggerSampling(pipelineId, taskId));
        }
        sessionIds.add(dagScheduler.triggerFullDose(pipelineId, taskId, params.containsKey("sampling")));
        return ApiResult.valueOf(sessionIds);
    }


    @PostMapping(value = "/queryStatus")
    @ResponseBody
    @ApiOperation(value = "查询数据流图实例状态", notes = "查询数据流图实例状态")
    public ApiResult<JSONObject> queryStatus(HttpServletRequest r, @RequestBody JSONObject params) {
        if (params == null || StringUtils.isEmpty(params.getString("sessionId"))) {
            logger.warn("API /pipeline/queryStatus failed, since {}", "sessionId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        return ApiResult.valueOf(pipelineService.queryStatus(params.getString("sessionId")));
    }

    @PostMapping(value = "/stop")
    @ApiOperation(value = "停止数据流图", notes = "停止单个数据流图")
    public ApiResult<String> stop(
            @RequestBody @ProjectAuth(auth = ProjectAuthEnum.WRITE) JSONObject params) {
        String sessionIdStr = params.getString("sessionId");
        if (sessionIdStr == null || sessionIdStr.isEmpty()) {
            logger.warn("API /pipeline/stop failed, since {}", "sessionId is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }

        try {
            for (String sessionId : sessionIdStr.split(",")) {
                dagScheduler.stopPipeline(Long.parseLong(sessionId));
            }
        } catch (DataScienceException e) {
            return ApiResult.valueOf(ApiResultCode.PIPELINE_STOP_ERROR, e.getMessage());
        }
        return ApiResult.valueOf(ApiResultCode.SUCCESS, "停止pipeline成功");
    }

    @PostMapping(value = "/clean")
    @ApiOperation(value = "清空pipeline操作", notes = "清空pipeline操作")
    @Transactional(rollbackFor = Exception.class)
    public ApiResult<Void> clean(@PipelineAuth(field = "id") @RequestBody PipelineVO vo) {
        if (vo.getId() == null || vo.getId() <= 0L) {
            logger.warn("API /pipeline/clean failed, since {}", "id is empty");
            return ApiResult.valueOf(ApiResultCode.PARAM_ERROR);
        }
        pipelineService.clean(vo.getId());
        return ApiResult.valueOf(ApiResultCode.SUCCESS);
    }

    @PostMapping(value = "/queryPivotTableMethod")
    @ResponseBody
    @ApiOperation(value = "获取数据透视统计方式", notes = "获取数据透视统计方式")
    public ApiResult<Object> queryPivotTableMethod() {
        JSONArray ja = new JSONArray();
        PivotTableMethodEnum[] ptmes = PivotTableMethodEnum.values();
        for (PivotTableMethodEnum ptme:ptmes) {
            JSONObject jo = new JSONObject();
            jo.put("name",ptme.getName());
            jo.put("method",ptme.getMethod());
            ja.add(jo);
        }
        return ApiResult.valueOf(ja);
    }
}
